/*
 * Unidata Platform
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *
 * Commercial License
 * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *
 * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 * For clarification or additional options, please contact: info@unidata-platform.com
 * -------
 * Disclaimer:
 * -------
 * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 */
package org.unidata.mdm.data.service.segments.records.merge;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.type.timeline.Timeline;
import org.unidata.mdm.data.context.AbstractRecordIdentityContext;
import org.unidata.mdm.data.context.GetRecordTimelineRequestContext;
import org.unidata.mdm.data.context.MergeRequestContext;
import org.unidata.mdm.data.dto.MergeRecordsDTO;
import org.unidata.mdm.data.exception.DataExceptionIds;
import org.unidata.mdm.data.exception.DataProcessingException;
import org.unidata.mdm.data.module.DataModule;
import org.unidata.mdm.data.service.impl.CommonRecordsComponent;
import org.unidata.mdm.data.type.apply.RecordMergeChangeSet;
import org.unidata.mdm.data.type.data.OriginRecord;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.data.util.RecordFactoryUtils;
import org.unidata.mdm.system.type.pipeline.Start;
import org.unidata.mdm.system.type.runtime.MeasurementPoint;

/**
 * @author Mikhail Mikhailov
 *         Merge validator.
 */
@Component(RecordMergeStartExecutor.SEGMENT_ID)
public class RecordMergeStartExecutor extends Start<MergeRequestContext, MergeRecordsDTO> {
    /**
     * Logger.
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(RecordMergeStartExecutor.class);
    /**
     * Common component.
     */
    @Autowired
    private CommonRecordsComponent commonRecordsComponent;
    /**
     * This segment ID.
     */
    public static final String SEGMENT_ID = DataModule.MODULE_ID + "[RECORD_MERGE_START]";
    /**
     * Localized message code.
     */
    public static final String SEGMENT_DESCRIPTION = DataModule.MODULE_ID + ".record.merge.start.description";
    /**
     * Constructor.
     */
    public RecordMergeStartExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION, MergeRequestContext.class, MergeRecordsDTO.class);
    }

    /* (non-Javadoc)
     * @see com.unidata.mdm.backend.service.data.listener.DataRecordExecutor#execute(com.unidata.mdm.backend.common.context.CommonRequestContext)
     */
    @Override
    public void start(MergeRequestContext ctx) {
        MeasurementPoint.start();
        try {
            setup(ctx);
        } finally {
            MeasurementPoint.stop();
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public String subject(MergeRequestContext ctx) {
        MeasurementPoint.start();
        try {
            setup(ctx);
            RecordKeys master = ctx.keys();
            return master.getEntityName();
        } finally {
            MeasurementPoint.stop();
        }
    }

    private void setup(MergeRequestContext ctx) {

        if (ctx.setUp()) {
            return;
        }

        // 1. Collect duplicates keys
        setupDuplicates(ctx);

        // 2. Setup master keys, possibly selecting one of the duplicates.
        setupMaster(ctx);

        // 3. Setup fields
        setupFields(ctx);

        ctx.setUp(true);
    }

    private void setupDuplicates(MergeRequestContext ctx) {

        // 1. Already resolved and supplied as key objects.
        if (CollectionUtils.isNotEmpty(ctx.duplicateKeys())) {
            setupDuplicateKeys(ctx);
        // 2. Or not
        } else {
            setupDuplicateContexts(ctx);
        }

        Map<String, Timeline<OriginRecord>> duplicateTimelines = ctx.duplicateTimelines();
        List<RecordKeys> duplicatesKeys = ctx.duplicateKeys();

        if (duplicatesKeys.isEmpty() || duplicateTimelines.isEmpty() || duplicatesKeys.size() != duplicateTimelines.size()) {
            throwNotFoundBySuppliedKeys(false, ctx);
        }
    }

    private void setupDuplicateKeys(MergeRequestContext ctx) {

        List<RecordKeys> duplicatesKeys = ctx.duplicateKeys();
        Map<String, Timeline<OriginRecord>> duplicateTimelines = ctx.duplicateTimelines();

        // Assume all valid
        if (MapUtils.isNotEmpty(duplicateTimelines) && duplicateTimelines.size() == duplicatesKeys.size()) {
            return;
        }

        duplicateTimelines = new HashMap<>();
        for (RecordKeys dKeys : duplicatesKeys) {

            Objects.requireNonNull(dKeys, "Record keys must not be null.");

            Timeline<OriginRecord> dTimeline = null;
            if (!dKeys.isActive()) {
                throwInvalidState(dKeys);
            }

            dTimeline = commonRecordsComponent.loadTimeline(GetRecordTimelineRequestContext.builder()
                    .etalonKey(dKeys.getEtalonKey().getId())
                    .lsn(dKeys.getEtalonKey().getLsn())
                    .shard(dKeys.getShard())
                    .fetchData(true)
                    .build());

            throwNullableTimeline(false, dTimeline);

            duplicateTimelines.put(dKeys.getEtalonKey().getId(), dTimeline);
        }

        ctx.duplicateTimelines(duplicateTimelines);
    }

    private void setupDuplicateContexts(MergeRequestContext ctx) {

        Map<String, Timeline<OriginRecord>> duplicateTimelines = new HashMap<>();
        List<RecordKeys> discardedKeys = new ArrayList<>();
        for (AbstractRecordIdentityContext dCtx : ctx.getDuplicates()) {

            Timeline<OriginRecord> dTimeline = null;
            RecordKeys dKeys = dCtx.keys();
            if (Objects.isNull(dKeys)) {

                dTimeline = commonRecordsComponent.loadTimeline(GetRecordTimelineRequestContext.builder(dCtx)
                        .fetchData(true)
                        .build());

                if (Objects.nonNull(dTimeline)) {
                    dKeys = dTimeline.getKeys();
                }
            }

            if (dKeys == null) {
                discardedKeys.add(RecordFactoryUtils.toIncompleteKeys(dCtx));
                continue;
            }

            if (!dKeys.isActive()) {
                throwInvalidState(dKeys);
            }

            // Keys were part of the supplied context
            if (Objects.isNull(dTimeline)) {
                dTimeline = commonRecordsComponent.loadTimeline(GetRecordTimelineRequestContext.builder(dCtx)
                        .fetchData(true)
                        .build());
            }

            throwNullableTimeline(false, dTimeline);

            duplicateTimelines.put(dKeys.getEtalonKey().getId(), dTimeline);
        }

        ctx.duplicateKeys(duplicateTimelines.values().stream().map(Timeline::<RecordKeys>getKeys).collect(Collectors.toList()));
        ctx.duplicateTimelines(duplicateTimelines);
        ctx.discardedKeys(discardedKeys);
    }

    private void setupMaster(MergeRequestContext ctx) {

        Timeline<OriginRecord> timeline = ctx.currentTimeline();
        RecordKeys keys = ctx.keys();
        if (Objects.isNull(keys) && ctx.isValidRecordKey()) {

            timeline = commonRecordsComponent.loadTimeline(GetRecordTimelineRequestContext.builder(ctx)
                .fetchData(true)
                .build());

            throwNullableTimeline(true, timeline);
            keys = timeline.getKeys();
        }

        // Select keys from duplicates if ctx.isValidRecordKey is true and keys not found
        if (keys == null) {

            for (ListIterator<RecordKeys> it = ctx.duplicateKeys().listIterator(ctx.duplicateKeys().size()); it.hasPrevious(); ) {

                RecordKeys c = it.previous();
                it.remove();

                keys = c;
                if (Objects.nonNull(keys)) {
                    // Found
                    timeline = ctx.duplicateTimelines().remove(keys.getEtalonKey().getId());
                    break;
                }
            }
        }

        if (keys == null) {
            throwNotFoundBySuppliedKeys(true, ctx);
        }

        if (!keys.isActive()) {
            throwInvalidState(keys);
        }

        // Check duplicates keys state additionally
        if (CollectionUtils.isEmpty(ctx.duplicateKeys())) {
            throwCannotExecute();
        }

        // Keys were supplied via properties, but the timeline was not
        if (Objects.isNull(timeline)) {

            timeline = commonRecordsComponent.loadTimeline(GetRecordTimelineRequestContext.builder(ctx)
                .fetchData(true)
                .build());

            throwNullableTimeline(true, timeline);
        }

        // Check master supplied twice
        if (ctx.duplicateTimelines().containsKey(keys.getEtalonKey().getId())) {
            throwSameIdTwice(keys);
        }

        ctx.currentTimeline(timeline);
        ctx.keys(keys);
    }

    private void setupFields(MergeRequestContext ctx) {

        if (Objects.isNull(ctx.changeSet())) {
            ctx.changeSet(new RecordMergeChangeSet());
        }

        if (Objects.isNull(ctx.timestamp())) {
            ctx.timestamp(new Date());
        }
    }

    private void throwNullableTimeline(boolean master, Timeline<OriginRecord> t) {

        if (Objects.isNull(t)) {
            final String message = master
                    ? "Master timeline must not be null for existing keys."
                    : "Duplicate timeline must not be null for existing keys.";

            LOGGER.warn(message);
            throw new DataProcessingException(message, DataExceptionIds.EX_DATA_RECORD_MERGE_NULLABLE_TIMELINE);
        }
    }

    private void throwSameIdTwice(RecordKeys keys) {
        final String message = "The key [{}] supplied twice - as master and duplicate.";
        LOGGER.warn(message, keys.getEtalonKey().getId());
        throw new DataProcessingException(message, DataExceptionIds.EX_DATA_RECORD_MERGE_MASTER_ID_IN_DUPLICATES, keys.getEtalonKey().getId());
    }

    private void throwCannotExecute() {

        final String message = "Cannot execute merge operation. Not enough records to merge.";
        LOGGER.warn(message);
        throw new DataProcessingException(message, DataExceptionIds.EX_DATA_RECORD_MERGE_CANNOT_EXECUTE);
    }

    private void throwInvalidState(RecordKeys keys) {

        final String message = "Record with keys etalon id: [{}], origin id: [{}] has incorrect state for merge - status: [{}]";
        LOGGER.warn(message,
                keys.getEtalonKey().getId(),
                keys.getOriginKey().getId(),
                keys.getEtalonKey().getStatus());
        throw new DataProcessingException(message, DataExceptionIds.EX_DATA_RECORD_MERGE_VALIDATE_INCORRECT_STATE,
                keys.getEtalonKey().getId(),
                keys.getOriginKey().getId(),
                keys.getEtalonKey().getStatus());
    }

    private void throwNotFoundBySuppliedKeys(boolean master, MergeRequestContext ctx) {

        final String message = master
                ? "Master record not found by supplied spec etalon id: [{}], origin id [{}], external id [{}], source system [{}], name [{}]"
                : "None of the duplicates were found for context spec etalon id: [{}], origin id [{}], external id [{}], source system [{}], name [{}]";

        LOGGER.warn(message,
                ctx.getEtalonKey(),
                ctx.getOriginKey(),
                ctx.getExternalId(),
                ctx.getSourceSystem(),
                ctx.getEntityName());
        throw new DataProcessingException(message, DataExceptionIds.EX_DATA_RECORD_MERGE_NOT_FOUND_BY_SUPPLIED_KEYS,
                ctx.getEtalonKey(),
                ctx.getOriginKey(),
                ctx.getExternalId(),
                ctx.getSourceSystem(),
                ctx.getEntityName());
    }
}
